package mq

import (
	"context"
	"fmt"

	"gitee.com/linxing_3/youye-core/logger"
	"gitee.com/linxing_3/youye-core/sdk/application"
	"gitee.com/linxing_3/youye-core/sdk/pkg/structs"
	"gitee.com/linxing_3/youye-core/sdk/pkg/yerr"

	"github.com/tx7do/kratos-transport/broker"
	"github.com/tx7do/kratos-transport/broker/nats"
	"github.com/tx7do/kratos-transport/broker/rabbitmq"
)

var _ application.IPlugin = (*MQPlugin)(nil)

const MQPluginKey = "MQPlugin"

func GetOrInit(app application.IApplication, opts ...*Option) (IMq, error) {
	plugin, ok := app.GetPlugin(MQPluginKey)
	if ok {
		return plugin.(IMq), nil
	}

	if len(opts) == 0 {
		return nil, yerr.New(yerr.ErrCodeBiz).WithMsg("mq plugin init failed : option is nil")
	}
	plugin = NewMQPlugin(opts[0])
	if err := plugin.Install(context.Background(), app); err != nil {
		return nil, err
	}
	return plugin.(IMq), app.AddPlugin(plugin)
}

func MustGet(app application.IApplication) *MQPlugin {
	plugin, ok := app.GetPlugin(MQPluginKey)
	if !ok {
		panic(fmt.Errorf("plugin %s not exists", MQPluginKey))
	}
	mq, ok := plugin.(*MQPlugin)
	if !ok {
		panic(fmt.Errorf("plugin %v is not type of *MQPlugin", mq))
	}
	return mq
}

type IMq interface {
	Subscribe(route string, handler broker.Handler) error
	Publish(ctx context.Context, route string, msg any) error
}

func NewMQPlugin(option *Option) application.IPlugin {

	return &MQPlugin{
		option: option,
	}
}

func initBroker(ctx context.Context, opt *Option) broker.Broker {
	switch opt.Engine {
	case EngineRabbitMQ:
		return rabbitmq.NewBroker(
			broker.WithAddress(opt.Dsn()),
			broker.WithOptionContext(ctx),
			broker.WithCodec("json"),
			rabbitmq.WithDurableExchange(),
			rabbitmq.WithExchangeName(opt.Exchange),
			rabbitmq.WithExchangeType("direct"),
		)
	case EngineNats:
		return nats.NewBroker(
			broker.WithAddress(opt.Dsn()),
			broker.WithOptionContext(ctx),
			broker.WithCodec("json"),
		)
	default:
		return nil
	}
}

type MQPlugin struct {
	option *Option
	broker broker.Broker
	log    *logger.Helper
}

// Install implements application.IPlugin.
func (r *MQPlugin) Install(ctx context.Context, core application.IApplication) error {
	r.broker = initBroker(ctx, r.option)
	if r.broker == nil {
		return yerr.New(yerr.ErrCodeBiz).WithMsg("mq plugin init failed : broker is nil")
	} else if err := r.broker.Init(); err != nil {
		return err
	} else if err := r.broker.Connect(); err != nil {
		return err
	}
	r.log = logger.NewHelper(core.GetLogger()).WithField("module", "mq")
	return nil
}

// Key implements application.IPlugin.
func (r *MQPlugin) Key() string {
	return MQPluginKey
}

// Uninstall implements application.IPlugin.
func (r *MQPlugin) Uninstall(application.IApplication) error {
	return r.broker.Disconnect()
}

func (r *MQPlugin) Subscribe(route string, handler broker.Handler) error {
	_, err := r.broker.Subscribe(route, func(ctx context.Context, evt broker.Event) error {
		defer func() {
			if err := recover(); err != nil {
				// fmt.Println("rabbit subscribe panic: ", err)
				r.log.WithField("route", route).Errorf("subscribe handle error:", err)
			}
		}()
		if err := handler(ctx, evt); err != nil {
			r.log.WithField("route", route).Errorf("subscribe handle error: %s", err)
			return err
		}
		return nil
	}, nil)
	if err != nil {
		r.log.WithField("route", route).Errorf("subscribe error: %s", err)
	}
	return err
}

func (r *MQPlugin) Publish(ctx context.Context, route string, msg any) error {
	if err := r.broker.Publish(ctx, route, msg); err != nil {
		r.log.WithField("route", route).WithFields(structs.ToMap(msg)).Errorf("publish error: %s", err)
		return err
	} else {
		defer func() {
			r.log.WithField("rabbitmq", route).WithFields(structs.ToMap(msg)).Info("publish success")
		}()
	}
	return nil
}
